Skip to content

Conversation

marianob-span1
Copy link

@marianob-span1 marianob-span1 commented Oct 3, 2025

We have a data pipeline composed of Airbyte for the data reading from different sources which is scheduled and orchestrated by Dagster (via Airbyte API) therefore we think it makes sense for us to give PyAirbyte a try and just run our connectors within Dagster.

However, we only use Airbyte to read data from sources and load it into raw tables, always appending, without any deduplication or normalization, which is later done by Dagster. In this setup the Postgres connector writes all the data it receives to a jsonb column named _airbyte_data.

PyAirbyte uses one column per attribute. The main issue with this approach is that it heavily relies on the quality of the connector schema: any little discrepancies between the schema and the data flowing in and it won't be able to write data into the destination table. There are many connectors where the type for schema fields has not been defined and thus Airbyte assumes string which pretty often does not correlate with the actual data being read.

This PR introduces a flag to add that behavior, only intended when appending data given that we have everything inside that jsonb column, even primary keys, without breaking any existing feature (hopefully!).

I have tested it with PostgreSQL, added tests for it and DuckDB too. Happy to extend this to other engines if needed.

Summary by CodeRabbit

  • New Features

    • Added optional single JSON column mode that consolidates record properties into a single _airbyte_data column while preserving internal columns.
    • Supported on PostgreSQL (stored as JSONB) and DuckDB.
    • Available only with APPEND or AUTO write strategies; unsupported strategies return a clear error.
  • Tests

    • Introduced end-to-end tests validating single JSON column mode on PostgreSQL and DuckDB, including error handling.
    • Marked certain integration tests to require credentials.

Copy link

github-actions bot commented Oct 3, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@mb/data-json' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@mb/data-json'

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test-pr - Runs tests with the updated PyAirbyte

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

Copy link
Contributor

coderabbitai bot commented Oct 3, 2025

📝 Walkthrough

Walkthrough

Adds a single JSON column mode controlled by SqlConfig.use_single_json_column, consolidating non-internal data into _airbyte_data. Updates column generation, dataframe preparation, and file-to-table writes accordingly. Adds Postgres JSONB typing via a new type converter, a DuckDB runtime fallback for this mode, constants/imports, and integration tests validating structure and behavior.

Changes

Cohort / File(s) Summary of changes
Single JSON column core
airbyte/shared/sql_processor.py, airbyte/constants.py, airbyte/datasets/_sql.py
Introduces use_single_json_column flag; creates AB_DATA_COLUMN; adjusts column schemas and dataframe handling to pack non-internal fields into a single JSON column; enforces allowed write strategies; updates dataset column listing.
Postgres typing
airbyte/_processors/sql/postgres.py
Adds PostgresTypeConverter with JSONB type for JSON data; wires via type_converter_class on PostgresSqlProcessor; imports JSONB.
DuckDB fallback
airbyte/_processors/sql/duckdb.py
In _write_files_to_new_table, short-circuits to superclass path when use_single_json_column is enabled; otherwise uses DuckDB native path.
Integration tests
tests/integration_tests/test_single_json_column.py, tests/integration_tests/test_all_cache_types.py
Adds end-to-end tests for single JSON column across Postgres and DuckDB, including schema/type assertions and negative strategy checks; adds requires_creds marker to an existing slow test.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User
  participant PyAirbyte as PyAirbyte API
  participant SQLProc as SQL Processor
  participant DB as SQL Backend

  rect rgba(220,235,255,0.5)
  note right of PyAirbyte: Write with use_single_json_column = true
  User->>PyAirbyte: write(dataset, config{use_single_json_column=true, strategy=APPEND})
  PyAirbyte->>SQLProc: prepare load
  SQLProc->>SQLProc: validate strategy (APPEND/AUTO only)
  SQLProc->>SQLProc: build columns: [_airbyte_data, _airbyte_id, _airbyte_extracted_at, _airbyte_meta]
  SQLProc->>SQLProc: pack non-internal fields into _airbyte_data (dict/JSON)
  SQLProc->>DB: create table if needed
  SQLProc->>DB: insert rows with JSON column
  DB-->>SQLProc: ack
  SQLProc-->>PyAirbyte: result
  PyAirbyte-->>User: processed_records
  end
Loading
sequenceDiagram
  autonumber
  participant SQLProc as DuckDB Processor
  participant Super as Base SQL Processor
  participant DuckDB as DuckDB

  alt use_single_json_column = true
    SQLProc->>Super: _write_files_to_new_table(files, ...)
    note right of Super: Use base file-to-table path
  else native path
    SQLProc->>DuckDB: read files natively
    SQLProc->>DuckDB: insert into new table
  end
Loading
sequenceDiagram
  autonumber
  participant Proc as PostgresSqlProcessor
  participant Conv as PostgresTypeConverter
  participant PG as PostgreSQL

  Proc->>Conv: get_json_type()
  Conv-->>Proc: JSONB()
  Proc->>PG: create table with _airbyte_data JSONB
  Proc->>PG: insert rows
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested labels

enable-ai-review

Suggested reviewers

  • aaronsteers
  • quintonwall

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title clearly and concisely summarizes the primary feature addition—introducing support for a single data column in the cache—and uses conventional commit style without unnecessary detail or ambiguity.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
tests/integration_tests/test_single_json_column.py (3)

114-117: Minor comment inconsistency: psycopg vs psycopg2.

The comment mentions "psycopg2" but the code uses psycopg (version 3). Would it make sense to update the comment to match the actual library being used, wdyt?

Apply this diff to correct the comment:

-                    # Verify it's a dict (psycopg2 automatically parses JSONB)
+                    # Verify it's a dict (psycopg automatically parses JSONB)

167-175: Consider parameterized queries to avoid SQL injection patterns.

The queries use f-string interpolation to embed schema and table names directly into SQL strings. While this is acceptable in a test context where inputs are controlled, it demonstrates a pattern that could be risky in production code. Would it make sense to use SQLAlchemy's parameterization or identifier quoting for better practice demonstration, wdyt?

For example:

from sqlalchemy import column, table, select

# Instead of f-strings, use SQLAlchemy constructs
columns_query = select(
    column('column_name'),
    column('data_type')
).select_from(
    table('columns', schema='information_schema')
).where(
    column('table_schema') == new_duckdb_cache.schema_name
).where(
    column('table_name') == table_name
).order_by(column('ordinal_position'))

This is more of a best-practice suggestion rather than a critical issue, since the test context is safe.

Also applies to: 199-205


234-261: Consider testing "auto" write strategy as well.

The test effectively validates that merge and replace strategies are rejected. However, the PR mentions that "auto" strategy should work (presumably by defaulting to append). Would it make sense to add a positive test case for "auto" strategy to ensure it's handled correctly in single JSON column mode, wdyt?

Something like:

# Verify that AUTO strategy works (should default to APPEND)
result = source.read(new_duckdb_cache, write_strategy="auto")
assert result.processed_records > 0
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 99d2ccf and 75b96ce.

📒 Files selected for processing (7)
  • airbyte/_processors/sql/duckdb.py (1 hunks)
  • airbyte/_processors/sql/postgres.py (3 hunks)
  • airbyte/constants.py (1 hunks)
  • airbyte/datasets/_sql.py (2 hunks)
  • airbyte/shared/sql_processor.py (6 hunks)
  • tests/integration_tests/test_all_cache_types.py (1 hunks)
  • tests/integration_tests/test_single_json_column.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
airbyte/shared/sql_processor.py (7)
airbyte/strategies.py (1)
  • WriteStrategy (18-51)
airbyte/exceptions.py (1)
  • PyAirbyteInputError (201-210)
airbyte/_processors/sql/postgres.py (2)
  • get_json_type (47-49)
  • normalize (65-67)
airbyte/types.py (2)
  • get_json_type (120-122)
  • to_sql_type (124-162)
airbyte/_processors/sql/snowflake.py (2)
  • get_json_type (198-200)
  • to_sql_type (182-195)
airbyte/shared/catalog_providers.py (1)
  • get_stream_properties (122-127)
airbyte/_util/name_normalizers.py (2)
  • normalize (23-25)
  • normalize (53-87)
airbyte/_processors/sql/postgres.py (2)
airbyte/shared/sql_processor.py (1)
  • SqlProcessorBase (179-1240)
airbyte/types.py (2)
  • SQLTypeConverter (99-162)
  • get_json_type (120-122)
airbyte/datasets/_sql.py (2)
airbyte/caches/base.py (1)
  • processor (191-193)
airbyte/shared/sql_processor.py (1)
  • sql_config (412-414)
airbyte/_processors/sql/duckdb.py (1)
airbyte/shared/sql_processor.py (2)
  • sql_config (412-414)
  • _write_files_to_new_table (883-953)
tests/integration_tests/test_single_json_column.py (6)
airbyte/caches/duckdb.py (1)
  • DuckDBCache (43-54)
airbyte/caches/postgres.py (1)
  • PostgresCache (37-61)
tests/conftest.py (3)
  • source_test_installation (289-320)
  • source_test_registry (250-272)
  • new_postgres_cache (231-246)
airbyte/datasets/_sql.py (4)
  • stream_name (87-88)
  • column_names (145-157)
  • to_pandas (109-110)
  • to_pandas (194-196)
airbyte/shared/sql_processor.py (1)
  • get_sql_connection (432-442)
airbyte/exceptions.py (1)
  • PyAirbyteInputError (201-210)
🪛 GitHub Actions: Run Linters
airbyte/shared/sql_processor.py

[error] 920-920: Mypy type check failed. airbyte/shared/sql_processor.py:920: List item 0 has incompatible type "None"; expected "str | bytes | date | timedelta | datetime64 | <8 more items>" [list-item]. Command: 'poetry run mypy .'

airbyte/_processors/sql/duckdb.py

[error] 122-122: Trailing whitespace detected (W291). This can be fixed automatically with 'ruff --fix' or by removing trailing spaces.


[error] ruff check failed with exit code 1 due to the above issues. Command: poetry run ruff check .

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (No Creds)
🔇 Additional comments (4)
tests/integration_tests/test_single_json_column.py (4)

1-26: LGTM!

The imports and fixture setup are clean and appropriate for the integration test. The autouse fixtures effectively enable the source-test connector for all tests in this module.


28-139: Well-structured PostgreSQL test.

The test thoroughly validates the single JSON column feature for PostgreSQL:

  • Correctly enables the feature and uses append-only strategy
  • Validates table structure at the SQL level (4 columns, JSONB type)
  • Verifies data integrity both directly via SQL and through the PyAirbyte API

The combination of low-level SQL inspection and high-level API validation provides good coverage.


141-231: Solid DuckDB structure test.

The test effectively validates the single JSON column feature for DuckDB:

  • Uses the cache's existing SQL connection (cleaner than manual connection)
  • Handles DuckDB-specific behavior (JSON potentially returned as string)
  • Validates structure and data integrity at both SQL and API levels

The parallel structure to the PostgreSQL test makes it easy to compare implementations.


234-261: Good negative path testing.

The test appropriately validates that incompatible write strategies (merge and replace) are rejected when single JSON column mode is enabled. The error message assertions ensure users receive helpful feedback about the restriction.

Comment on lines +122 to +124
# since the DuckDB native approach doesn't support dynamically combining columns
# into JSON.
if self.sql_config.use_single_json_column:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix trailing whitespace to satisfy Ruff.

The new comment block has trailing spaces, which is tripping Ruff’s W291 check in CI. Could we trim those so the lint job passes, wdyt?

-        # since the DuckDB native approach doesn't support dynamically combining columns 
+        # since the DuckDB native approach doesn't support dynamically combining columns
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# since the DuckDB native approach doesn't support dynamically combining columns
# into JSON.
if self.sql_config.use_single_json_column:
# since the DuckDB native approach doesn't support dynamically combining columns
# into JSON.
if self.sql_config.use_single_json_column:
🧰 Tools
🪛 GitHub Actions: Run Linters

[error] 122-122: Trailing whitespace detected (W291). This can be fixed automatically with 'ruff --fix' or by removing trailing spaces.

🤖 Prompt for AI Agents
In airbyte/_processors/sql/duckdb.py around lines 122 to 124, the added comment
lines contain trailing whitespace which triggers Ruff W291; remove the trailing
spaces at the ends of those comment lines (and any other trailing whitespace
nearby), save the file, and re-run the linter/CI to verify the warning is
resolved.

Comment on lines +918 to +925
dataframe[AB_DATA_COLUMN] = (
dataframe[data_columns]
.replace([np.nan], [None], regex=False)
.apply(lambda row: row.to_dict(), axis=1)
)
# Drop the original data columns
dataframe = dataframe.drop(columns=data_columns)
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Address Mypy failure in single JSON consolidation.

Mypy is failing on this block because .replace([np.nan], [None], regex=False) feeds [None] to pandas, which violates the stubbed type signature (see the CI error at Line 920). Could we switch to a where-based approach to normalize NaNs before building _airbyte_data, wdyt?

-                if data_columns:
-                    dataframe[AB_DATA_COLUMN] = (
-                        dataframe[data_columns]
-                        .replace([np.nan], [None], regex=False)
-                        .apply(lambda row: row.to_dict(), axis=1)
-                    )
+                if data_columns:
+                    sanitized = dataframe[data_columns].where(
+                        pd.notna(dataframe[data_columns]), None
+                    )
+                    dataframe[AB_DATA_COLUMN] = sanitized.apply(lambda row: row.to_dict(), axis=1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dataframe[AB_DATA_COLUMN] = (
dataframe[data_columns]
.replace([np.nan], [None], regex=False)
.apply(lambda row: row.to_dict(), axis=1)
)
# Drop the original data columns
dataframe = dataframe.drop(columns=data_columns)
else:
if data_columns:
sanitized = dataframe[data_columns].where(
pd.notna(dataframe[data_columns]), None
)
dataframe[AB_DATA_COLUMN] = sanitized.apply(lambda row: row.to_dict(), axis=1)
# Drop the original data columns
dataframe = dataframe.drop(columns=data_columns)
else:
🧰 Tools
🪛 GitHub Actions: Run Linters

[error] 920-920: Mypy type check failed. airbyte/shared/sql_processor.py:920: List item 0 has incompatible type "None"; expected "str | bytes | date | timedelta | datetime64 | <8 more items>" [list-item]. Command: 'poetry run mypy .'

🤖 Prompt for AI Agents
In airbyte/shared/sql_processor.py around lines 918 to 925, the current use of
.replace([np.nan], [None], regex=False) causes a mypy type error because passing
[None] to replace conflicts with pandas stubs; change the normalization to a
where-based approach: compute a cleaned frame for data_columns that replaces NaN
values with None using a boolean mask (e.g., using isna()/~isna() with
where/mask) and then apply row.to_dict() on that cleaned frame to build
AB_DATA_COLUMN, then drop the original data_columns as before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed in another PR that MyPy is being replaced, wondering if I should fix this anyway

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant